package org.hope.lee.consumer.exception;

import java.util.List;

import com.alibaba.rocketmq.client.consumer.DefaultMQPushConsumer;
import com.alibaba.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
import com.alibaba.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
import com.alibaba.rocketmq.client.consumer.listener.MessageListener;
import com.alibaba.rocketmq.client.consumer.listener.MessageListenerConcurrently;
import com.alibaba.rocketmq.client.exception.MQClientException;
import com.alibaba.rocketmq.client.producer.DefaultMQProducer;
import com.alibaba.rocketmq.common.consumer.ConsumeFromWhere;
import com.alibaba.rocketmq.common.message.MessageExt;
import com.sun.org.apache.xpath.internal.SourceTree;

/**
 * 消费端重试的情况 :异常情况
 */
public class ConsumerException {
	public static void main(String[] args) throws MQClientException {
		DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("push_consumer");
		consumer.setNamesrvAddr("192.168.31.176:9876;192.168.31.165:9876");
		// 批量消费,每次拉取10条
		consumer.setConsumeMessageBatchMaxSize(10);// ①
		// consumer.setInstanceName("quick_start_consumer");
		// 3.2.6这个版本没有这个方法，3.5.3版本要设置这个方法为false,否则取不到topic
		// consumer.setVipChannelEnabled(false);

		// 程序第一次启动从消息队列头取数据
		// 如果非第一次启动，那么按照上次消费的位置继续消费
		consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);
		// 订阅PushTopic下Tag为push的消息
		consumer.subscribe("PushTopic_tt1", "*");
		consumer.registerMessageListener(new MessageListenerConcurrently() {
			public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {
				
				MessageExt msg = msgs.get(0);
				try {
					String topic = msg.getTopic();
					String msgBody = new String(msg.getBody(), "utf-8");
					String tags = msg.getTags();
					System.out.println("收到消息:topic:" + topic + ",tags:" + tags + ",msg:" + msg + "msgBody:" + msgBody);
					if ("message4".equals(msgBody)) {
						System.out.println("====失败消息开始=====");
						System.out.println("msg:" + msg);
						System.out.println("msgBody:" + msgBody);
						System.out.println("====失败消息结束=====");
						// 发生异常
						int i = 1 / 0;
						System.out.println(i);
					}
				} catch (Exception e) {
					e.printStackTrace();
					// 如果重试了三次就返回成功
					if (msg.getReconsumeTimes() == 3) {
						return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
					}
					return ConsumeConcurrentlyStatus.RECONSUME_LATER;
				}

				return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
			}
		});
		consumer.start();
		System.out.println("Consumer Started.");
		// consumer.suspend();

	}
}
